package com.tpvlog.rpc.registry;

import com.tpvlog.rpc.core.ServiceMeta;
import com.tpvlog.rpc.core.utils.RpcServiceHelper;
import com.tpvlog.rpc.registry.loadbalancer.ServiceLoadBalancer;
import com.tpvlog.rpc.registry.loadbalancer.ZKConsistentHashLoadBalancer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;

public class ZookeeperRegistryService implements RegistryService {
    /**
     * 注册中心连接失败时的重试间隔
     */
    public static final int BASE_SLEEP_TIME_MS = 1000;

    /**
     * 注册中心连接失败时的重试次数
     */
    public static final int MAX_RETRIES = 3;

    /**
     * 注册中心中的RPC服务根节点路径
     */
    public static final String ZK_BASE_PATH = "/rpc";

    private final ServiceDiscovery<ServiceMeta> serviceDiscovery;

    public ZookeeperRegistryService(String registryAddr) throws Exception {
        // 1.连接Zookeeper注册中心
        CuratorFramework client = CuratorFrameworkFactory.newClient(registryAddr,
                new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRIES));
        client.start();

        JsonInstanceSerializer<ServiceMeta> serializer = new JsonInstanceSerializer<>(ServiceMeta.class);

        // 2.基于Curator框架构建一个服务注册工具类
        this.serviceDiscovery = ServiceDiscoveryBuilder
                .builder(ServiceMeta.class) // 自定义数据
                .client(client)
                .serializer(serializer)
                .basePath(ZK_BASE_PATH)
                .build();
        this.serviceDiscovery.start();
    }

    @Override
    public void register(ServiceMeta serviceMeta) throws Exception {
        String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());

        // 构建一个服务实例对象
        ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance.<ServiceMeta>builder()
                .name(serviceKey)
                .address(serviceMeta.getAddress())
                .port(serviceMeta.getPort())
                .payload(serviceMeta)
                .build();
        // 注册服务实例
        serviceDiscovery.registerService(serviceInstance);
    }

    @Override
    public void unRegister(ServiceMeta serviceMeta) throws Exception {
        String serviceKey = RpcServiceHelper.buildServiceKey(serviceMeta.getService(), serviceMeta.getVersion());
        ServiceInstance<ServiceMeta> serviceInstance = ServiceInstance.<ServiceMeta>builder()
                .name(serviceKey)
                .address(serviceMeta.getAddress())
                .port(serviceMeta.getPort())
                .payload(serviceMeta)
                .build();
        serviceDiscovery.unregisterService(serviceInstance);
    }

    @Override
    public ServiceMeta discovery(String serviceName, int invokerHashCode) throws Exception {
        Collection<ServiceInstance<ServiceMeta>> serviceInstances = serviceDiscovery.queryForInstances(serviceName);
        ServiceLoadBalancer balancer = new ZKConsistentHashLoadBalancer();
        // 基于一致性Hash算法，选择一个服务节点
        ServiceInstance<ServiceMeta> instance = (ServiceInstance<ServiceMeta>) balancer.select(new ArrayList(serviceInstances), invokerHashCode);
        if (instance != null) {
            return instance.getPayload();
        }
        return null;
    }

    @Override
    public void destroy() throws IOException {
        serviceDiscovery.close();
    }
}
